From: pavelxdd Date: Mon, 20 Aug 2018 21:46:51 +0000 (+0300) Subject: Implement named pipe support X-Git-Tag: archive/raspbian/2.0.44-1+rpi1~1^2~3^2~8^2~88^2~4 X-Git-Url: https://dgit.raspbian.org/%22http://www.example.com/cgi/success//%22http:/www.example.com/cgi/success/?a=commitdiff_plain;h=144af84a58da60070f809b72246a6303dfde20b2;p=siridb-server.git Implement named pipe support --- diff --git a/Debug/src/siri/net/subdir.mk b/Debug/src/siri/net/subdir.mk index 098b149f..c6b2b666 100644 --- a/Debug/src/siri/net/subdir.mk +++ b/Debug/src/siri/net/subdir.mk @@ -10,7 +10,8 @@ C_SRCS += \ ../src/siri/net/promise.c \ ../src/siri/net/promises.c \ ../src/siri/net/protocol.c \ -../src/siri/net/socket.c +../src/siri/net/socket.c \ +../src/siri/net/pipe.c OBJS += \ ./src/siri/net/bserver.o \ @@ -19,7 +20,8 @@ OBJS += \ ./src/siri/net/promise.o \ ./src/siri/net/promises.o \ ./src/siri/net/protocol.o \ -./src/siri/net/socket.o +./src/siri/net/socket.o \ +./src/siri/net/pipe.o C_DEPS += \ ./src/siri/net/bserver.d \ @@ -28,7 +30,8 @@ C_DEPS += \ ./src/siri/net/promise.d \ ./src/siri/net/promises.d \ ./src/siri/net/protocol.d \ -./src/siri/net/socket.d +./src/siri/net/socket.d \ +./src/siri/net/pipe.d # Each subdirectory must supply rules for building sources it contributes @@ -38,5 +41,3 @@ src/siri/net/%.o: ../src/siri/net/%.c gcc -DDEBUG=1 -I../include -O0 -g3 -Wall -Wextra $(CPPFLAGS) $(CFLAGS) -c -fmessage-length=0 -MMD -MP -MF"$(@:%.o=%.d)" -MT"$(@)" -o "$@" "$<" @echo 'Finished building: $<' @echo ' ' - - diff --git a/Release/src/siri/net/subdir.mk b/Release/src/siri/net/subdir.mk index c5ef8a3b..54657d92 100644 --- a/Release/src/siri/net/subdir.mk +++ b/Release/src/siri/net/subdir.mk @@ -10,7 +10,8 @@ C_SRCS += \ ../src/siri/net/promise.c \ ../src/siri/net/promises.c \ ../src/siri/net/protocol.c \ -../src/siri/net/socket.c +../src/siri/net/socket.c \ +../src/siri/net/pipe.c OBJS += \ ./src/siri/net/bserver.o \ @@ -19,7 +20,8 @@ OBJS += \ ./src/siri/net/promise.o \ ./src/siri/net/promises.o \ ./src/siri/net/protocol.o \ -./src/siri/net/socket.o +./src/siri/net/socket.o \ +./src/siri/net/pipe.o C_DEPS += \ ./src/siri/net/bserver.d \ @@ -28,7 +30,8 @@ C_DEPS += \ ./src/siri/net/promise.d \ ./src/siri/net/promises.d \ ./src/siri/net/protocol.d \ -./src/siri/net/socket.d +./src/siri/net/socket.d \ +./src/siri/net/pipe.d # Each subdirectory must supply rules for building sources it contributes @@ -38,5 +41,3 @@ src/siri/net/%.o: ../src/siri/net/%.c gcc -I../include -O3 -Wall -Wextra $(CPPFLAGS) $(CFLAGS) -c -fmessage-length=0 -MMD -MP -MF"$(@:%.o=%.d)" -MT"$(@)" -o "$@" "$<" @echo 'Finished building: $<' @echo ' ' - - diff --git a/include/siri/cfg/cfg.h b/include/siri/cfg/cfg.h index 446520fc..51b0d4b0 100644 --- a/include/siri/cfg/cfg.h +++ b/include/siri/cfg/cfg.h @@ -23,6 +23,8 @@ typedef struct siri_cfg_s uint8_t shard_compression; char server_address[SIRI_CFG_MAX_LEN_ADDRESS]; char default_db_path[SIRI_PATH_MAX]; + uint8_t pipe_support; + char pipe_client_name[SIRI_PATH_MAX]; } siri_cfg_t; void siri_cfg_init(siri_t * siri); diff --git a/include/siri/net/clserver.h b/include/siri/net/clserver.h index 4bf7b81d..12dd4f6d 100644 --- a/include/siri/net/clserver.h +++ b/include/siri/net/clserver.h @@ -12,10 +12,67 @@ #pragma once #include +#include +#include #include typedef struct siri_s siri_t; +#define sirinet_client_incref(client) \ +switch ((client)->type) \ +{ \ +case UV_TCP: \ + sirinet_socket_incref(client); \ + break; \ +case UV_NAMED_PIPE: \ + sirinet_pipe_incref(client); \ + break; \ +default: \ + break; \ +} + +#define sirinet_client_decref(client) \ +switch ((client)->type) \ +{ \ +case UV_TCP: \ + sirinet_socket_decref(client); \ + break; \ +case UV_NAMED_PIPE: \ + sirinet_pipe_decref(client); \ + break; \ +default: \ + uv_close((uv_handle_t *) (client), NULL); \ + break; \ +} + +#define CLIENT_SIRIDB(client, siridb) \ +siridb_t * siridb = NULL; \ +switch ((client)->type) \ +{ \ +case UV_TCP: \ + siridb = ((sirinet_socket_t *) (client)->data)->siridb; \ + break; \ +case UV_NAMED_PIPE: \ + siridb = ((sirinet_pipe_t *) (client)->data)->siridb; \ + break; \ +default: \ + break; \ +} + +#define CLIENT_USER(client, user) \ +siridb_user_t * user = NULL; \ +switch ((client)->type) \ +{ \ +case UV_TCP: \ + user = (siridb_user_t *) ((sirinet_socket_t *) (client)->data)->origin; \ + break; \ +case UV_NAMED_PIPE: \ + user = (siridb_user_t *) ((sirinet_pipe_t *) (client)->data)->origin; \ + break; \ +default: \ + break; \ +} + int sirinet_clserver_init(siri_t * siri); typedef ssize_t (*sirinet_clserver_getfile)(char ** buffer, siridb_t * siridb); diff --git a/include/siri/net/pipe.h b/include/siri/net/pipe.h new file mode 100644 index 00000000..8851c82e --- /dev/null +++ b/include/siri/net/pipe.h @@ -0,0 +1,57 @@ +#pragma once + +#include +#include +#include +#include + +#define PIPE_NAME_SZ SIRI_PATH_MAX +#define RESET_BUF_SIZE 1048576 /* 1 MB */ + +typedef enum sirinet_pipe_tp +{ + PIPE_CLIENT, + PIPE_BACKEND +} sirinet_pipe_tp_t; + +typedef struct siridb_s siridb_t; +typedef struct siridb_user_s siridb_user_t; + +typedef void (* on_data_cb_t)(uv_stream_t * client, sirinet_pkg_t * pkg); +typedef void (* on_free_cb_t)(uv_stream_t * client); + +typedef struct sirinet_pipe_s +{ + sirinet_pipe_tp_t tp; + uint32_t ref; + on_data_cb_t on_data; + on_free_cb_t on_free; + siridb_t * siridb; + void * origin; /* can be a user, server or NULL */ + char * buf; + size_t len; + size_t size; + uv_pipe_t pipe; +} sirinet_pipe_t; + +uv_pipe_t * sirinet_pipe_new( + sirinet_pipe_tp_t tp, + on_data_cb_t cb_data, + on_free_cb_t cb_free); +void sirinet_pipe_alloc_buffer( + uv_handle_t * handle, + size_t suggested_size, + uv_buf_t * buf); +int sirinet_pipe_name(char * buffer, uv_stream_t * client); +void sirinet_pipe_on_data( + uv_stream_t * client, + ssize_t nread, + const uv_buf_t * buf); +void sirinet__pipe_free(uv_stream_t * client); + +#define sirinet_pipe_incref(client) \ + ((sirinet_pipe_t *) client->data)->ref++ + +#define sirinet_pipe_decref(client) \ + if (!--((sirinet_pipe_t *) client->data)->ref) \ + uv_close((uv_handle_t *) client, (uv_close_cb) sirinet__pipe_free) diff --git a/siridb.conf b/siridb.conf index a2e89908..7e6c665f 100644 --- a/siridb.conf +++ b/siridb.conf @@ -60,7 +60,17 @@ heartbeat_interval = 30 max_open_files = 32768 # -# Use shard compression for storing data points. +# Use shard compression for storing data points. # Set value 0 to disable shard compression. # enable_shard_compression = 1 + +# +# Enable named pipe support for client connections. +# +enable_pipe_support = 0 + +# +# SiriDB will bind the client named pipe in this location. +# +pipe_client_name = siridb_client.sock diff --git a/src/siri/cfg/cfg.c b/src/siri/cfg/cfg.c index 9bacac0c..832c5f6c 100644 --- a/src/siri/cfg/cfg.c +++ b/src/siri/cfg/cfg.c @@ -29,7 +29,9 @@ static siri_cfg_t siri_cfg = { .ip_support=IP_SUPPORT_ALL, .shard_compression=0, .server_address="localhost", - .default_db_path="/var/lib/siridb/" + .default_db_path="/var/lib/siridb/", + .pipe_support=0, + .pipe_client_name="siridb_client.sock" }; static void SIRI_CFG_read_uint( @@ -47,10 +49,15 @@ static void SIRI_CFG_read_addr( cfgparser_t * cfgparser, const char * option_name, char ** dest); +static void SIRI_CFG_read_pipe_name( + cfgparser_t * cfgparser, + const char * option_name, + char * dest); static void SIRI_CFG_read_default_db_path(cfgparser_t * cfgparser); static void SIRI_CFG_read_max_open_files(cfgparser_t * cfgparser); static void SIRI_CFG_read_ip_support(cfgparser_t * cfgparser); static void SIRI_CFG_read_shard_compression(cfgparser_t * cfgparser); +static void SIRI_CFG_read_pipe_support(cfgparser_t * cfgparser); void siri_cfg_init(siri_t * siri) { @@ -119,6 +126,16 @@ void siri_cfg_init(siri_t * siri) "bind_server_address", &siri_cfg.bind_backend_addr); + SIRI_CFG_read_pipe_support(cfgparser); + + if (siri_cfg.pipe_support) + { + SIRI_CFG_read_pipe_name( + cfgparser, + "pipe_client_name", + &siri_cfg.pipe_client_name); + } + cfgparser_free(cfgparser); } @@ -273,6 +290,40 @@ static void SIRI_CFG_read_shard_compression(cfgparser_t * cfgparser) } +static void SIRI_CFG_read_pipe_support(cfgparser_t * cfgparser) +{ + cfgparser_option_t * option; + cfgparser_return_t rc; + rc = cfgparser_get_option( + &option, + cfgparser, + "siridb", + "enable_pipe_support"); + if (rc != CFGPARSER_SUCCESS) + { + log_debug( + "Missing '%s' in '%s': %s. " + "Disable pipe support", + "enable_pipe_support", + siri.args->config, + cfgparser_errmsg(rc)); + } + else if (option->tp != CFGPARSER_TP_INTEGER || option->val->integer > 1) + { + log_warning( + "Error reading '%s' in '%s': %s. " + "Disable pipe support", + "enable_pipe_support", + siri.args->config, + "error: expecting 0 or 1"); + } + else if (option->val->integer == 1) + { + siri_cfg.pipe_support = 1; + } + +} + static void SIRI_CFG_read_addr( cfgparser_t * cfgparser, const char * option_name, @@ -317,6 +368,62 @@ static void SIRI_CFG_read_addr( } } +static void SIRI_CFG_read_pipe_name( + cfgparser_t * cfgparser, + const char * option_name, + char * dest) +{ + cfgparser_option_t * option; + cfgparser_return_t rc; + size_t len; + rc = cfgparser_get_option( + &option, + cfgparser, + "siridb", + option_name); + if (rc != CFGPARSER_SUCCESS) + { + log_warning( + "Error reading '%s' in '%s': %s. " + "Using default value: '%s'", + option_name, + siri.args->config, + cfgparser_errmsg(rc), + dest); + } + else if (option->tp != CFGPARSER_TP_STRING) + { + log_warning( + "Error reading '%s' in '%s': %s. " + "Using default value: '%s'", + option_name, + siri.args->config, + "error: expecting a string value", + dest); + } + else + { + *dest = 0; + + /* keep space left for a terminator char */ + strncpy(dest, + option->val->string, + SIRI_PATH_MAX - 1); + + len = strlen(dest); + + if (len == SIRI_PATH_MAX - 1) + { + log_warning( + "Default '%s' path exceeds %d characters, please " + "check your configuration file: %s", + option_name, + SIRI_PATH_MAX - 2, + siri.args->config); + } + } +} + static void SIRI_CFG_read_default_db_path(cfgparser_t * cfgparser) { cfgparser_option_t * option; diff --git a/src/siri/db/auth.c b/src/siri/db/auth.c index 377575b9..863243d1 100644 --- a/src/siri/db/auth.c +++ b/src/siri/db/auth.c @@ -58,8 +58,18 @@ cproto_server_t siridb_auth_user_request( return CPROTO_ERR_AUTH_CREDENTIALS; } - ((sirinet_socket_t *) client->data)->siridb = siridb; - ((sirinet_socket_t *) client->data)->origin = user; + switch (client->type) + { + case UV_TCP: + ((sirinet_socket_t *) client->data)->siridb = siridb; + ((sirinet_socket_t *) client->data)->origin = user; + break; + case UV_NAMED_PIPE: + ((sirinet_pipe_t *) client->data)->siridb = siridb; + ((sirinet_pipe_t *) client->data)->origin = user; + break; + } + siridb_user_incref(user); return CPROTO_RES_AUTH_SUCCESS; @@ -116,8 +126,17 @@ bproto_server_t siridb_auth_server_request( return BPROTO_AUTH_ERR_UNKNOWN_UUID; } - ((sirinet_socket_t *) client->data)->siridb = siridb; - ((sirinet_socket_t *) client->data)->origin = server; + switch (client->type) + { + case UV_TCP: + ((sirinet_socket_t *) client->data)->siridb = siridb; + ((sirinet_socket_t *) client->data)->origin = server; + break; + case UV_NAMED_PIPE: + ((sirinet_pipe_t *) client->data)->siridb = siridb; + ((sirinet_pipe_t *) client->data)->origin = server; + break; + } free(server->version); server->version = strdup((const char *) qp_version->via.raw); @@ -127,4 +146,3 @@ bproto_server_t siridb_auth_server_request( return BPROTO_AUTH_SUCCESS; } - diff --git a/src/siri/db/insert.c b/src/siri/db/insert.c index 571831a8..714ee503 100644 --- a/src/siri/db/insert.c +++ b/src/siri/db/insert.c @@ -22,7 +22,7 @@ #include #include #include -#include +#include #include #include #include @@ -280,7 +280,7 @@ int siridb_insert_points_to_pools(siridb_insert_t * insert, size_t npoints) insert->npoints= npoints; /* increment the client reference counter */ - sirinet_socket_incref(insert->client); + sirinet_client_incref(insert->client); uv_async_init(siri.loop, handle, INSERT_points_to_pools); handle->data = (void *) insert; @@ -341,7 +341,7 @@ int insert_init_backend_local( } qp_unpacker_init(&ilocal->unpacker, promise->pkg->data, promise->pkg->len); - sirinet_socket_incref(client); + sirinet_client_incref(client); promise->data = client; promise->cb = (sirinet_promise_cb) INSERT_local_promise_backend_cb; @@ -376,8 +376,7 @@ static void INSERT_on_response(slist_t * promises, uv_async_t * handle) sirinet_pkg_t * pkg; sirinet_promise_t * promise; siridb_insert_t * insert = (siridb_insert_t *) handle->data; - siridb_t * siridb = - ((sirinet_socket_t *) insert->client->data)->siridb; + CLIENT_SIRIDB(insert->client, siridb) int n = 0; char msg[MAX_INSERT_MSG]; @@ -998,7 +997,7 @@ static void INSERT_local_promise_backend_cb( { sirinet_pkg_send(client, pkg); } - sirinet_socket_decref(client); + sirinet_client_decref(client); sirinet_promise_decref(promise); } @@ -1085,7 +1084,8 @@ static int INSERT_init_local( static void INSERT_points_to_pools(uv_async_t * handle) { siridb_insert_t * insert = (siridb_insert_t *) handle->data; - siridb_t * siridb = ((sirinet_socket_t *) insert->client->data)->siridb; + CLIENT_SIRIDB(insert->client, siridb) + uint16_t pool = siridb->server->pool; sirinet_pkg_t * pkg, * repl_pkg; sirinet_promises_t * promises = sirinet_promises_new( @@ -1472,7 +1472,7 @@ static void INSERT_free(uv_handle_t * handle) siridb_insert_t * insert = (siridb_insert_t *) handle->data; /* decrement the client reference counter */ - sirinet_socket_decref(insert->client); + sirinet_client_decref(insert->client); /* free insert */ siridb_insert_free(insert); @@ -1481,5 +1481,3 @@ static void INSERT_free(uv_handle_t * handle) free((uv_async_t *) handle); } - - diff --git a/src/siri/db/query.c b/src/siri/db/query.c index 55fa4ae1..2dd700a6 100644 --- a/src/siri/db/query.c +++ b/src/siri/db/query.c @@ -23,7 +23,7 @@ #include #include #include -#include +#include #include #include #include @@ -70,7 +70,6 @@ void siridb_query_run( float factor, int flags) { - siridb_t * siridb; uv_async_t * handle = (uv_async_t *) malloc(sizeof(uv_async_t)); if (handle == NULL) { @@ -104,7 +103,7 @@ void siridb_query_run( query->pid = pid; /* increment client reference counter */ - sirinet_socket_incref(client); + sirinet_client_incref(client); query->client = client; query->flags = flags; @@ -132,8 +131,9 @@ void siridb_query_run( log_debug("Parsing query (%d): %s", query->flags, query->q); } + CLIENT_SIRIDB(query->client, siridb) + /* increment active tasks */ - siridb = ((sirinet_socket_t *) query->client->data)->siridb; siridb_tasks_inc(siridb->tasks); /* send next call */ @@ -145,7 +145,7 @@ void siridb_query_run( void siridb_query_free(uv_handle_t * handle) { siridb_query_t * query = (siridb_query_t *) handle->data; - siridb_t * siridb = ((sirinet_socket_t *) query->client->data)->siridb; + CLIENT_SIRIDB(query->client, siridb) /* decrement active tasks */ siridb_tasks_dec(siridb->tasks); @@ -174,7 +174,7 @@ void siridb_query_free(uv_handle_t * handle) } /* decrement client reference counter */ - sirinet_socket_decref(query->client); + sirinet_client_decref(query->client); /* free query */ free(query); @@ -252,7 +252,7 @@ void siridb_query_forward( int flags) { siridb_query_t * query = (siridb_query_t *) handle->data; - siridb_t * siridb = ((sirinet_socket_t *) query->client->data)->siridb; + CLIENT_SIRIDB(query->client, siridb) /* * the size is important here, we will use the alloc_size to guess the @@ -534,7 +534,8 @@ static void QUERY_send_no_query(uv_async_t * handle) #ifndef DEBUG /* production version returns timestamp now */ - siridb_t * siridb = ((sirinet_socket_t *) query->client->data)->siridb; + CLIENT_SIRIDB(query->client, siridb) + qp_add_raw(query->packer, (const unsigned char *) "calc", 4); uint64_t ts = siridb_time_now(siridb, query->start); @@ -564,7 +565,8 @@ static void QUERY_parse(uv_async_t * handle) { int rc; siridb_query_t * query = (siridb_query_t *) handle->data; - siridb_t * siridb = ((sirinet_socket_t *) query->client->data)->siridb; + CLIENT_SIRIDB(query->client, siridb) + siridb_walker_t * walker = siridb_walker_new( siridb, siridb_time_now(siridb, query->start), @@ -654,8 +656,10 @@ static int QUERY_to_packer(qp_packer_t * packer, siridb_query_t * query) char buffer[packer->alloc_size]; size_t size = packer->alloc_size; + CLIENT_SIRIDB(query->client, siridb) + rc = QUERY_rebuild( - ((sirinet_socket_t *) query->client->data)->siridb, + siridb, query->pr->tree->children->node, buffer, &size, diff --git a/src/siri/net/bserver.c b/src/siri/net/bserver.c index 7e2dd4ef..5fb6fb5c 100644 --- a/src/siri/net/bserver.c +++ b/src/siri/net/bserver.c @@ -764,4 +764,3 @@ static void on_disable_backup_mode(uv_stream_t * client, sirinet_pkg_t * pkg) sirinet_pkg_send(client, package); } } - diff --git a/src/siri/net/clserver.c b/src/siri/net/clserver.c index 8c4e75b8..dcfc3b2e 100644 --- a/src/siri/net/clserver.c +++ b/src/siri/net/clserver.c @@ -31,7 +31,6 @@ #include #include #include -#include #include #include #include @@ -49,11 +48,10 @@ const unsigned long int WARNING_PKG_SIZE = RESET_BUF_SIZE; */ #define MAX_QUERY_PKG_SIZE 65535 - #define DEFAULT_BACKLOG 128 -#define CHECK_SIRIDB(ssocket) \ -sirinet_socket_t * ssocket = client->data; \ -if (ssocket->siridb == NULL) \ +#define CHECK_SIRIDB(client, siridb) \ +CLIENT_SIRIDB(client, siridb) \ +if ((siridb) == NULL) \ { \ sirinet_pkg_t * package; \ package = sirinet_pkg_new(pkg->pid, 0, CPROTO_ERR_NOT_AUTHENTICATED, NULL);\ @@ -69,10 +67,15 @@ static const int SERVER_RUNNING_REINDEXING = static uv_loop_t * loop = NULL; static struct sockaddr_storage client_addr; -static uv_tcp_t client_server; +static uv_tcp_t client_server_tcp; +static uv_pipe_t client_server_pipe; static void on_data(uv_stream_t * client, sirinet_pkg_t * pkg); -static void on_new_connection(uv_stream_t * server, int status); +static void on_tcp_data(uv_stream_t * client, sirinet_pkg_t * pkg); +static void on_tcp_new_connection(uv_stream_t * server, int status); +static void on_pipe_data(uv_stream_t * client, sirinet_pkg_t * pkg); +static void on_pipe_free(uv_stream_t * client); +static void on_pipe_new_connection(uv_stream_t * server, int status); static void on_auth_request(uv_stream_t * client, sirinet_pkg_t * pkg); static void on_query(uv_stream_t * client, sirinet_pkg_t * pkg); static void on_insert(uv_stream_t * client, sirinet_pkg_t * pkg); @@ -116,10 +119,11 @@ int sirinet_clserver_init(siri_t * siri) /* bind loop to the given loop */ loop = siri->loop; - uv_tcp_init(loop, &client_server); + uv_tcp_init(loop, &client_server_tcp); + uv_pipe_init(loop, &client_server_pipe, 0); /* make sure data is set to NULL so we later on can check this value. */ - client_server.data = NULL; + client_server_tcp.data = NULL; if (siri->cfg->bind_client_addr != NULL) { @@ -155,40 +159,75 @@ int sirinet_clserver_init(siri_t * siri) (struct sockaddr_in *) &client_addr); } - uv_tcp_bind( - &client_server, + rc = uv_tcp_bind( + &client_server_tcp, (const struct sockaddr *) &client_addr, (siri->cfg->ip_support == IP_SUPPORT_IPV6ONLY) ? UV_TCP_IPV6ONLY : 0); + if (rc) + { + log_error("Error binding TCP client server: %s", uv_strerror(rc)); + return 1; + } + rc = uv_listen( - (uv_stream_t*) &client_server, + (uv_stream_t*) &client_server_tcp, DEFAULT_BACKLOG, - on_new_connection); + on_tcp_new_connection); if (rc) { - log_error("Error listening client server: %s", uv_strerror(rc)); + log_error("Error listening TCP client server: %s", uv_strerror(rc)); return 1; } - log_info("Start listening for client connections on port %d", + log_info("Start listening for TCP client connections on port %d", siri->cfg->listen_client_port); + if (siri->cfg->pipe_support) + { + char *pipe_name = siri->cfg->pipe_client_name; + + rc = uv_pipe_bind( + &client_server_pipe, + pipe_name); + + if (rc) + { + log_error("Error binding pipe client server: %s", uv_strerror(rc)); + return 1; + } + + rc = uv_listen( + (uv_stream_t*) &client_server_pipe, + DEFAULT_BACKLOG, + on_pipe_new_connection); + + if (rc) + { + log_error("Error listening TCP client server: %s", uv_strerror(rc)); + return 1; + } + + log_info("Start listening for pipe client connections on '%s'", + pipe_name); + } + return 0; } -static void on_new_connection(uv_stream_t * server, int status) +static void on_tcp_new_connection(uv_stream_t * server, int status) { - log_debug("Received a client connection request."); + log_debug("Received a TCP client connection request."); if (status < 0) { - log_error("Client connection error: %s", uv_strerror(status)); + log_error("TCP client connection error: %s", uv_strerror(status)); return; } uv_tcp_t * client = - sirinet_socket_new(SOCKET_CLIENT, (on_data_cb_t) &on_data); + sirinet_socket_new(SOCKET_CLIENT, (on_data_cb_t) &on_tcp_data); if (client != NULL) { @@ -208,37 +247,41 @@ static void on_new_connection(uv_stream_t * server, int status) } } -static void on_data(uv_stream_t * client, sirinet_pkg_t * pkg) +static void on_pipe_new_connection(uv_stream_t * server, int status) { - if (Logger.level == LOGGER_DEBUG) + log_debug("Received a pipe client connection request."); + + if (status < 0) { - char addr_port[ADDR_BUF_SZ]; - if (sirinet_addr_and_port(addr_port, client) == 0) - { - log_debug( - "Package received from client '%s' " - "(pid: %" PRIu16 ", len: %" PRIu32 ", tp: %s)", - addr_port, - pkg->pid, - pkg->len, - sirinet_cproto_client_str(pkg->tp)); - } + log_error("Pipe client connection error: %s", uv_strerror(status)); + return; } - else if (pkg->len >= WARNING_PKG_SIZE) + uv_pipe_t * client = + sirinet_pipe_new( + PIPE_CLIENT, + (on_data_cb_t) &on_pipe_data, + (on_free_cb_t) &on_pipe_free); + + if (client != NULL) { - char addr_port[ADDR_BUF_SZ]; - if (sirinet_addr_and_port(addr_port, client) == 0) + uv_pipe_init(loop, client, 0); + + if (uv_accept(server, (uv_stream_t *) client) == 0) { - log_warning( - "Got a large package from '%s' (pid: %d, len: %d, tp: %s)." - " A package size smaller than 1MB is recommended!", - addr_port, - pkg->pid, - pkg->len, - sirinet_cproto_client_str(pkg->tp)); + uv_read_start( + (uv_stream_t *) client, + sirinet_pipe_alloc_buffer, + sirinet_pipe_on_data); + } + else + { + sirinet_pipe_decref(client); } } +} +static void on_data(uv_stream_t * client, sirinet_pkg_t * pkg) +{ /* in case the online flag is not set, we cannot perform any request */ if (siri.status == SIRI_STATUS_RUNNING) { @@ -284,14 +327,94 @@ static void on_data(uv_stream_t * client, sirinet_pkg_t * pkg) } else { - /* data->siridb can be NULL here, make sure we can handle this state */ + CLIENT_SIRIDB(client, siridb) + + /* siridb can be NULL here, make sure we can handle this state */ CLSERVER_send_server_error( - ((sirinet_socket_t *) client->data)->siridb, + siridb, client, pkg); } } +static void on_tcp_data(uv_stream_t * client, sirinet_pkg_t * pkg) +{ + if (Logger.level == LOGGER_DEBUG) + { + char addr_port[ADDR_BUF_SZ]; + if (sirinet_addr_and_port(addr_port, client) == 0) + { + log_debug( + "Package received from client '%s' " + "(pid: %" PRIu16 ", len: %" PRIu32 ", tp: %s)", + addr_port, + pkg->pid, + pkg->len, + sirinet_cproto_client_str(pkg->tp)); + } + } + else if (pkg->len >= WARNING_PKG_SIZE) + { + char addr_port[ADDR_BUF_SZ]; + if (sirinet_addr_and_port(addr_port, client) == 0) + { + log_warning( + "Got a large package from '%s' (pid: %d, len: %d, tp: %s)." + " A package size smaller than 1MB is recommended!", + addr_port, + pkg->pid, + pkg->len, + sirinet_cproto_client_str(pkg->tp)); + } + } + + on_data(client, pkg); +} + +static void on_pipe_data(uv_stream_t * client, sirinet_pkg_t * pkg) +{ + if (Logger.level == LOGGER_DEBUG) + { + char pipe_name[PIPE_NAME_SZ]; + if (sirinet_pipe_name(pipe_name, client) == 0) + { + log_debug( + "Package received from client '%s' " + "(pid: %" PRIu16 ", len: %" PRIu32 ", tp: %s)", + pipe_name, + pkg->pid, + pkg->len, + sirinet_cproto_client_str(pkg->tp)); + } + } + else if (pkg->len >= WARNING_PKG_SIZE) + { + char pipe_name[PIPE_NAME_SZ]; + if (sirinet_pipe_name(pipe_name, client) == 0) + { + log_warning( + "Got a large package from '%s' (pid: %d, len: %d, tp: %s)." + " A package size smaller than 1MB is recommended!", + pipe_name, + pkg->pid, + pkg->len, + sirinet_cproto_client_str(pkg->tp)); + } + } + + on_data(client, pkg); +} + +static void on_pipe_free(uv_stream_t * client) +{ + char pipe_name[PIPE_NAME_SZ]; + if (sirinet_pipe_name(pipe_name, client) == 0) + { + uv_fs_t req; + uv_fs_unlink(loop, &req, pipe_name, NULL); + } +} + static void on_auth_request(uv_stream_t * client, sirinet_pkg_t * pkg) { cproto_server_t rc; @@ -395,7 +518,7 @@ static void CLSERVER_send_pool_error( static void on_query(uv_stream_t * client, sirinet_pkg_t * pkg) { - CHECK_SIRIDB(ssocket) + CHECK_SIRIDB(client, siridb) if (pkg->len > MAX_QUERY_PKG_SIZE) { @@ -436,12 +559,12 @@ static void on_query(uv_stream_t * client, sirinet_pkg_t * pkg) if (qp_time_precision.tp == QP_INT64 && (tp = (siridb_timep_t) qp_time_precision.via.int64) != - ssocket->siridb->time->precision) + siridb->time->precision) { tp %= SIRIDB_TIME_END; } factor = (tp == SIRIDB_TIME_DEFAULT) ? 0.0 : - pow(1000.0, tp - ssocket->siridb->time->precision); + pow(1000.0, tp - siridb->time->precision); siridb_query_run( pkg->pid, @@ -459,12 +582,13 @@ static void on_query(uv_stream_t * client, sirinet_pkg_t * pkg) static void on_insert(uv_stream_t * client, sirinet_pkg_t * pkg) { - CHECK_SIRIDB(ssocket) + CHECK_SIRIDB(client, siridb) + CLIENT_USER(client, siridb_user) char err_msg[SIRIDB_MAX_SIZE_ERR_MSG]; if (!siridb_user_check_access( - (siridb_user_t *) ssocket->origin, + siridb_user, SIRIDB_ACCESS_INSERT, err_msg)) { @@ -486,8 +610,6 @@ static void on_insert(uv_stream_t * client, sirinet_pkg_t * pkg) return; } - siridb_t * siridb = ssocket->siridb; - /* only when when the flag is EXACTLY running or * running + re-indexing we can continue */ if ( siridb->server->flags != SERVER_FLAG_RUNNING && @@ -661,9 +783,9 @@ static void on_reqfile( sirinet_pkg_t * pkg, sirinet_clserver_getfile getfile) { - CHECK_SIRIDB(ssocket) + CHECK_SIRIDB(client, siridb) + CLIENT_USER(client, siridb_user) - siridb_t * siridb = ssocket->siridb; sirinet_pkg_t * package = NULL; char err_msg[SIRIDB_MAX_SIZE_ERR_MSG]; @@ -681,7 +803,7 @@ static void on_reqfile( err_msg); } else if (!siridb_user_check_access( - (siridb_user_t *) ssocket->origin, + siridb_user, SIRIDB_ACCESS_PROFILE_FULL, err_msg)) { @@ -720,9 +842,9 @@ static void on_reqfile( */ static void on_register_server(uv_stream_t * client, sirinet_pkg_t * pkg) { - CHECK_SIRIDB(ssocket) + CHECK_SIRIDB(client, siridb) + CLIENT_USER(client, siridb_user) - siridb_t * siridb = ssocket->siridb; sirinet_pkg_t * package = NULL; siridb_server_t * new_server = NULL; char err_msg[SIRIDB_MAX_SIZE_ERR_MSG]; @@ -754,7 +876,7 @@ static void on_register_server(uv_stream_t * client, sirinet_pkg_t * pkg) err_msg); } else if (!siridb_user_check_access( - (siridb_user_t *) ssocket->origin, + siridb_user, SIRIDB_ACCESS_PROFILE_FULL, err_msg)) { @@ -828,7 +950,7 @@ static void on_register_server(uv_stream_t * client, sirinet_pkg_t * pkg) if (servers != NULL && (package = sirinet_pkg_dup(pkg)) != NULL) { /* make sure to decrement the client in the callback */ - sirinet_socket_incref(client); + sirinet_client_incref(client); siridb_servers_send_pkg( servers, @@ -988,9 +1110,8 @@ static void CLSERVER_on_register_server_response( } /* decref the client */ - sirinet_socket_decref(server_reg->client); + sirinet_client_decref(server_reg->client); /* free server register object */ free(server_reg); } - diff --git a/src/siri/net/pipe.c b/src/siri/net/pipe.c new file mode 100644 index 00000000..fd8c7c78 --- /dev/null +++ b/src/siri/net/pipe.c @@ -0,0 +1,245 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#define MAX_ALLOWED_PKG_SIZE 20971520 /* 20 MB */ + +#define QUIT_PIPE \ + free(spipe->buf); \ + spipe->buf = NULL; \ + spipe->len = 0; \ + spipe->size = 0; \ + spipe->on_data = NULL; \ + sirinet_pipe_decref(client); \ + return; + +/* + * This function can raise a SIGNAL. + */ +void sirinet_pipe_alloc_buffer( + uv_handle_t * handle, + size_t suggested_size, + uv_buf_t * buf) +{ + sirinet_pipe_t * spipe = (sirinet_pipe_t *) handle->data; + + if (!spipe->len && spipe->size > RESET_BUF_SIZE) + { + free(spipe->buf); + spipe->buf = (char *) malloc(suggested_size); + if (spipe->buf == NULL) + { + ERR_ALLOC + buf->len = 0; + return; + } + spipe->size = suggested_size; + spipe->len = 0; + } + buf->base = spipe->buf + spipe->len; + buf->len = spipe->size - spipe->len; +} + +/* + * Buffer should have a size of PIPE_NAME_SZ + * + * Return 0 if successful or -1 in case of an error. + */ +int sirinet_pipe_name(char * buffer, uv_stream_t * client) +{ + size_t len = PIPE_NAME_SZ - 1; + + if (uv_pipe_getsockname( + (uv_pipe_t *) client, + buffer, + &len)) + { + return -1; + } + + buffer[len] = 0; + return 0; +} + +/* + * This function can raise a SIGNAL. + */ +void sirinet_pipe_on_data( + uv_stream_t * client, + ssize_t nread, + const uv_buf_t * buf) +{ + sirinet_pipe_t * spipe = (sirinet_pipe_t *) client->data; + sirinet_pkg_t * pkg; + size_t total_sz; + uint8_t check; + + /* + * spipe->on_data is NULL when 'sirinet_pipe_decref' is called from + * within this function. We should never call 'sirinet_pipe_decref' twice + * so the best thing is to log and and exit this function. + */ + if (spipe->on_data == NULL) + { + char pipe_name[PIPE_NAME_SZ]; + if (sirinet_pipe_name(pipe_name, client) == 0) + { + log_error( + "Received data from '%s' but we ignore the data since the " + "connection will be closed in a few seconds...", + pipe_name); + } + return; + } + + if (nread < 0) + { + if (nread != UV_EOF) + { + log_error("Read error: %s", uv_err_name(nread)); + } + QUIT_PIPE + } + + spipe->len += nread; + + if (spipe->len < sizeof(sirinet_pkg_t)) + { + return; + } + + pkg = (sirinet_pkg_t *) spipe->buf; + check = pkg->tp ^ 255; + if ( check != pkg->checkbit || + (spipe->tp == PIPE_CLIENT && pkg->len > MAX_ALLOWED_PKG_SIZE)) + { + char pipe_name[PIPE_NAME_SZ]; + if (sirinet_pipe_name(pipe_name, client) == 0) + { + log_error( + "Got an illegal package or size too large from '%s', " + "closing connection " + "(pid: %" PRIu16 ", len: %" PRIu32 ", tp: %" PRIu8 ")", + pipe_name, pkg->pid, pkg->len, pkg->tp); + } + QUIT_PIPE + } + + total_sz = sizeof(sirinet_pkg_t) + pkg->len; + if (spipe->len < total_sz) + { + if (spipe->size < total_sz) + { + char * tmp = realloc(spipe->buf, total_sz); + if (tmp == NULL) + { + log_critical( + "Cannot allocate size for package " + "(pid: %" PRIu16 ", len: %" PRIu32 ", tp: %" PRIu8 ")", + pkg->pid, pkg->len, pkg->tp); + QUIT_PIPE + } + spipe->buf = tmp; + spipe->size = total_sz; + } + return; + } + + /* call on-data function */ + (*spipe->on_data)(client, pkg); + + spipe->len -= total_sz; + + if (spipe->len > 0) + { + /* move data and call sirinet_pipe_on_data() function again */ + memmove(spipe->buf, spipe->buf + total_sz, spipe->len); + sirinet_pipe_on_data(client, 0, buf); + } +} + +/* + * Returns NULL and raises a SIGNAL in case an error has occurred. + * + * Note: ((sirinet_pipe_t *) pipe->data)->ref is initially set to 1 + */ +uv_pipe_t * sirinet_pipe_new( + sirinet_pipe_tp_t tp, + on_data_cb_t cb_data, + on_free_cb_t cb_free) +{ + sirinet_pipe_t * spipe = + (sirinet_pipe_t *) malloc(sizeof(sirinet_pipe_t)); + + if (spipe == NULL) + { + ERR_ALLOC + return NULL; + } + + spipe->tp = tp; + spipe->on_data = cb_data; + spipe->on_free = cb_free; + spipe->buf = NULL; + spipe->len = 0; + spipe->size = -1; /* this will force allocating on first request */ + spipe->origin = NULL; + spipe->siridb = NULL; + spipe->ref = 1; + spipe->pipe.data = spipe; + + return &spipe->pipe; +} + +/* + * Never use this function but call sirinet_pipe_decref. + * Destroy pipe. (parsing NULL is not allowed) + * + * We know three different pipe types: + * - client: used for clients. a user object might be destroyed. + * - back-end: used to connect to other servers. a server might be destroyed. + * - server: user for severs connecting to here. a server might be destroyed. + * + * In case a server is destroyed, remaining promises will be cancelled and + * the call-back functions will be called. + */ +void sirinet__pipe_free(uv_stream_t * client) +{ + sirinet_pipe_t * spipe = client->data; + +#if DEBUG + log_debug("Free pipe type: %d", spipe->tp); +#endif + + switch (spipe->tp) + { + case PIPE_CLIENT: /* listens to client connections */ + if (spipe->origin != NULL) + { + siridb_user_t * user = (siridb_user_t *) spipe->origin; + siridb_user_decref(user); + } + break; + case PIPE_BACKEND: /* listens to server connections */ + if (spipe->origin != NULL) + { + siridb_server_t * server = (siridb_server_t *) spipe->origin; + siridb_server_decref(server); + } + break; + } + + if (spipe->on_free != NULL) + { + spipe->on_free(client); + } + + free(spipe->buf); + free(spipe); +} diff --git a/src/siri/net/pkg.c b/src/siri/net/pkg.c index 3dd00609..bb6d9348 100644 --- a/src/siri/net/pkg.c +++ b/src/siri/net/pkg.c @@ -13,7 +13,7 @@ #include #include #include -#include +#include #include #include #include @@ -167,7 +167,7 @@ int sirinet_pkg_send(uv_stream_t * client, sirinet_pkg_t * pkg) } /* increment client reference counter */ - sirinet_socket_incref(client); + sirinet_client_incref(client); data->client = client; data->pkg = pkg; @@ -213,7 +213,7 @@ static void PKG_write_cb(uv_write_t * req, int status) pkg_send_t * data = (pkg_send_t *) req->data; - sirinet_socket_decref(data->client); + sirinet_client_decref(data->client); free(data->pkg); free(data); diff --git a/src/siri/parser/listener.c b/src/siri/parser/listener.c index c24f8f02..44f3b92c 100644 --- a/src/siri/parser/listener.c +++ b/src/siri/parser/listener.c @@ -35,7 +35,7 @@ #include #include #include -#include +#include #include #include #include @@ -381,10 +381,10 @@ else \ /* * Start SIRIPARSER_MASTER_CHECK_ACCESS */ -#define SIRIPARSER_MASTER_CHECK_ACCESS(ACCESS_BIT) \ +#define SIRIPARSER_MASTER_CHECK_ACCESS(user, ACCESS_BIT) \ if (IS_MASTER && \ !siridb_user_check_access( \ - (siridb_user_t *) ((sirinet_socket_t *) query->client->data)->origin, \ + user, \ ACCESS_BIT, \ query->err_msg)) \ { \ @@ -512,7 +512,7 @@ static void enter_access_expr(uv_async_t * handle) static void enter_alter_group(uv_async_t * handle) { siridb_query_t * query = (siridb_query_t *) handle->data; - siridb_t * siridb = ((sirinet_socket_t *) query->client->data)->siridb; + CLIENT_SIRIDB(query->client, siridb) query_alter_t * q_alter = (query_alter_t *) query->data; MASTER_CHECK_ACCESSIBLE(siridb) @@ -545,7 +545,7 @@ static void enter_alter_group(uv_async_t * handle) static void enter_alter_server(uv_async_t * handle) { siridb_query_t * query = (siridb_query_t *) handle->data; - siridb_t * siridb = ((sirinet_socket_t *) query->client->data)->siridb; + CLIENT_SIRIDB(query->client, siridb) query_alter_t * q_alter = (query_alter_t *) query->data; siridb_server_t * server = siridb_server_from_node( siridb, @@ -578,7 +578,8 @@ static void enter_alter_stmt(uv_async_t * handle) { siridb_query_t * query = (siridb_query_t *) handle->data; - SIRIPARSER_MASTER_CHECK_ACCESS(SIRIDB_ACCESS_ALTER) + CLIENT_USER(query->client, siridb_user) + SIRIPARSER_MASTER_CHECK_ACCESS(siridb_user, SIRIDB_ACCESS_ALTER) #if DEBUG assert (query->packer == NULL); @@ -607,7 +608,7 @@ static void enter_alter_stmt(uv_async_t * handle) static void enter_alter_user(uv_async_t * handle) { siridb_query_t * query = (siridb_query_t *) handle->data; - siridb_t * siridb = ((sirinet_socket_t *) query->client->data)->siridb; + CLIENT_SIRIDB(query->client, siridb) MASTER_CHECK_ACCESSIBLE(siridb) @@ -641,7 +642,8 @@ static void enter_count_stmt(uv_async_t * handle) { siridb_query_t * query = (siridb_query_t *) handle->data; - SIRIPARSER_MASTER_CHECK_ACCESS(SIRIDB_ACCESS_COUNT) + CLIENT_USER(query->client, siridb_user) + SIRIPARSER_MASTER_CHECK_ACCESS(siridb_user, SIRIDB_ACCESS_COUNT) #if DEBUG assert (query->packer == NULL); @@ -672,7 +674,8 @@ static void enter_create_stmt(uv_async_t * handle) { siridb_query_t * query = (siridb_query_t *) handle->data; - SIRIPARSER_MASTER_CHECK_ACCESS(SIRIDB_ACCESS_CREATE) + CLIENT_USER(query->client, siridb_user) + SIRIPARSER_MASTER_CHECK_ACCESS(siridb_user, SIRIDB_ACCESS_CREATE) SIRIPARSER_NEXT_NODE } @@ -712,7 +715,8 @@ static void enter_drop_stmt(uv_async_t * handle) assert (query->packer == NULL); #endif - SIRIPARSER_MASTER_CHECK_ACCESS(SIRIDB_ACCESS_DROP) + CLIENT_USER(query->client, siridb_user) + SIRIPARSER_MASTER_CHECK_ACCESS(siridb_user, SIRIDB_ACCESS_DROP) query->packer = sirinet_packer_new(1024); @@ -738,9 +742,9 @@ static void enter_drop_stmt(uv_async_t * handle) static void enter_grant_user(uv_async_t * handle) { siridb_query_t * query = (siridb_query_t *) handle->data; - siridb_t * siridb = ((sirinet_socket_t *) query->client->data)->siridb; - - SIRIPARSER_MASTER_CHECK_ACCESS(SIRIDB_ACCESS_GRANT) + CLIENT_SIRIDB(query->client, siridb) + CLIENT_USER(query->client, siridb_user) + SIRIPARSER_MASTER_CHECK_ACCESS(siridb_user, SIRIDB_ACCESS_GRANT) MASTER_CHECK_ACCESSIBLE(siridb) cleri_node_t * user_node = @@ -778,7 +782,7 @@ static void enter_grant_user(uv_async_t * handle) static void enter_group_match(uv_async_t * handle) { siridb_query_t * query = (siridb_query_t *) handle->data; - siridb_t * siridb = ((sirinet_socket_t *) query->client->data)->siridb; + CLIENT_SIRIDB(query->client, siridb) cleri_node_t * node = query->nodes->node; query_wrapper_t * q_wrapper = (query_wrapper_t *) query->data; @@ -870,7 +874,7 @@ static void enter_help(uv_async_t * handle) static void enter_limit_expr(uv_async_t * handle) { siridb_query_t * query = (siridb_query_t *) handle->data; - siridb_t * siridb = ((sirinet_socket_t *) query->client->data)->siridb; + CLIENT_SIRIDB(query->client, siridb) query_list_t * qlist = (query_list_t *) query->data; int64_t limit = query->nodes->node->children->next->node->result; @@ -896,7 +900,8 @@ static void enter_list_stmt(uv_async_t * handle) { siridb_query_t * query = (siridb_query_t *) handle->data; - SIRIPARSER_MASTER_CHECK_ACCESS(SIRIDB_ACCESS_LIST) + CLIENT_USER(query->client, siridb_user) + SIRIPARSER_MASTER_CHECK_ACCESS(siridb_user, SIRIDB_ACCESS_LIST) #if DEBUG assert (query->packer == NULL); @@ -960,9 +965,9 @@ static void enter_merge_as(uv_async_t * handle) static void enter_revoke_user(uv_async_t * handle) { siridb_query_t * query = (siridb_query_t *) handle->data; - siridb_t * siridb = ((sirinet_socket_t *) query->client->data)->siridb; - - SIRIPARSER_MASTER_CHECK_ACCESS(SIRIDB_ACCESS_REVOKE) + CLIENT_SIRIDB(query->client, siridb) + CLIENT_USER(query->client, siridb_user) + SIRIPARSER_MASTER_CHECK_ACCESS(siridb_user, SIRIDB_ACCESS_REVOKE) MASTER_CHECK_ACCESSIBLE(siridb) cleri_node_t * user_node = @@ -1004,12 +1009,13 @@ static void enter_revoke_user(uv_async_t * handle) static void enter_select_stmt(uv_async_t * handle) { siridb_query_t * query = (siridb_query_t *) handle->data; - siridb_t * siridb = ((sirinet_socket_t *) query->client->data)->siridb; + CLIENT_SIRIDB(query->client, siridb) query_select_t * q_select; cleri_children_t * child; int skip_get_points; - SIRIPARSER_MASTER_CHECK_ACCESS(SIRIDB_ACCESS_SELECT) + CLIENT_USER(query->client, siridb_user) + SIRIPARSER_MASTER_CHECK_ACCESS(siridb_user, SIRIDB_ACCESS_SELECT) MASTER_CHECK_ACCESSIBLE(siridb) #if DEBUG @@ -1064,7 +1070,7 @@ static void enter_select_stmt(uv_async_t * handle) static void enter_set_expression(uv_async_t * handle) { siridb_query_t * query = (siridb_query_t *) handle->data; - siridb_t * siridb = ((sirinet_socket_t *) query->client->data)->siridb; + CLIENT_SIRIDB(query->client, siridb) cleri_node_t * node = query->nodes->node->children->next->next->node; query_alter_t * q_alter = (query_alter_t *) query->data; @@ -1100,7 +1106,7 @@ static void enter_set_ignore_threshold(uv_async_t * handle) static void enter_set_name(uv_async_t * handle) { siridb_query_t * query = (siridb_query_t *) handle->data; - siridb_t * siridb = ((sirinet_socket_t *) query->client->data)->siridb; + CLIENT_SIRIDB(query->client, siridb) cleri_node_t * name_node = query->nodes->node->children->next->next->node; @@ -1167,7 +1173,7 @@ static void enter_series_name(uv_async_t * handle) { siridb_query_t * query = (siridb_query_t *) handle->data; cleri_node_t * node = query->nodes->node; - siridb_t * siridb = ((sirinet_socket_t *) query->client->data)->siridb; + CLIENT_SIRIDB(query->client, siridb) query_wrapper_t * q_wrapper = (query_wrapper_t *) query->data; siridb_series_t * series = NULL; uint16_t pool; @@ -1324,7 +1330,7 @@ static void enter_series_match(uv_async_t * handle) static void enter_series_all(uv_async_t * handle) { siridb_query_t * query = (siridb_query_t *) handle->data; - siridb_t * siridb = ((sirinet_socket_t *) query->client->data)->siridb; + CLIENT_SIRIDB(query->client, siridb) siridb_series_t * series; query_wrapper_t * q_wrapper = (query_wrapper_t *) query->data; @@ -1383,7 +1389,7 @@ static void enter_series_all(uv_async_t * handle) static void enter_series_re(uv_async_t * handle) { siridb_query_t * query = (siridb_query_t *) handle->data; - siridb_t * siridb = ((sirinet_socket_t *) query->client->data)->siridb; + CLIENT_SIRIDB(query->client, siridb) cleri_node_t * node = query->nodes->node; query_wrapper_t * q_wrapper = (query_wrapper_t *) query->data; @@ -1561,9 +1567,9 @@ static void exit_after_expr(uv_async_t * handle) static void exit_alter_group(uv_async_t * handle) { siridb_query_t * query = (siridb_query_t *) handle->data; + CLIENT_SIRIDB(query->client, siridb) - if (siridb_groups_save( - ((sirinet_socket_t *) query->client->data)->siridb->groups)) + if (siridb_groups_save(siridb->groups)) { FILE_ERR_RET } @@ -1590,8 +1596,9 @@ static void exit_alter_group(uv_async_t * handle) static void exit_alter_user(uv_async_t * handle) { siridb_query_t * query = (siridb_query_t *) handle->data; + CLIENT_SIRIDB(query->client, siridb) - if (siridb_users_save(((sirinet_socket_t *) query->client->data)->siridb)) + if (siridb_users_save(siridb)) { FILE_ERR_RET } @@ -1687,7 +1694,7 @@ static void exit_calc_stmt(uv_async_t * handle) static void exit_count_groups(uv_async_t * handle) { siridb_query_t * query = (siridb_query_t *) handle->data; - siridb_t * siridb = ((sirinet_socket_t *) query->client->data)->siridb; + CLIENT_SIRIDB(query->client, siridb) query_count_t * q_count = (query_count_t *) query->data; if (q_count->where_expr == NULL || !cexpr_contains( @@ -1720,7 +1727,7 @@ static void exit_count_groups(uv_async_t * handle) static void exit_count_pools(uv_async_t * handle) { siridb_query_t * query = (siridb_query_t *) handle->data; - siridb_t * siridb = ((sirinet_socket_t *) query->client->data)->siridb; + CLIENT_SIRIDB(query->client, siridb) query_count_t * q_count = (query_count_t *) query->data; siridb_pool_t * pool = siridb->pools->pool + siridb->server->pool; @@ -1765,7 +1772,7 @@ static void exit_count_pools(uv_async_t * handle) static void exit_count_series(uv_async_t * handle) { siridb_query_t * query = (siridb_query_t *) handle->data; - siridb_t * siridb = ((sirinet_socket_t *) query->client->data)->siridb; + CLIENT_SIRIDB(query->client, siridb) query_count_t * q_count = (query_count_t *) query->data; MASTER_CHECK_ONLINE(siridb) @@ -1829,7 +1836,7 @@ static void exit_count_series(uv_async_t * handle) static void exit_count_series_length(uv_async_t * handle) { siridb_query_t * query = (siridb_query_t *) handle->data; - siridb_t * siridb = ((sirinet_socket_t *) query->client->data)->siridb; + CLIENT_SIRIDB(query->client, siridb) query_count_t * q_count = (query_count_t *) query->data; MASTER_CHECK_ACCESSIBLE(siridb) @@ -1916,7 +1923,7 @@ static void exit_count_series_length(uv_async_t * handle) static void exit_count_servers(uv_async_t * handle) { siridb_query_t * query = (siridb_query_t *) handle->data; - siridb_t * siridb = ((sirinet_socket_t *) query->client->data)->siridb; + CLIENT_SIRIDB(query->client, siridb) query_count_t * q_count = (query_count_t *) query->data; cexpr_t * where_expr = q_count->where_expr; cexpr_cb_t cb = (cexpr_cb_t) siridb_server_cexpr_cb; @@ -1972,7 +1979,7 @@ static void exit_count_servers(uv_async_t * handle) static void exit_count_servers_received(uv_async_t * handle) { siridb_query_t * query = (siridb_query_t *) handle->data; - siridb_t * siridb = ((sirinet_socket_t *) query->client->data)->siridb; + CLIENT_SIRIDB(query->client, siridb) query_count_t * q_count = (query_count_t *) query->data; cexpr_t * where_expr = q_count->where_expr; cexpr_cb_t cb = (cexpr_cb_t) siridb_server_cexpr_cb; @@ -2006,7 +2013,7 @@ static void exit_count_servers_received(uv_async_t * handle) static void exit_count_servers_selected(uv_async_t * handle) { siridb_query_t * query = (siridb_query_t *) handle->data; - siridb_t * siridb = ((sirinet_socket_t *) query->client->data)->siridb; + CLIENT_SIRIDB(query->client, siridb) query_count_t * q_count = (query_count_t *) query->data; cexpr_t * where_expr = q_count->where_expr; cexpr_cb_t cb = (cexpr_cb_t) siridb_server_cexpr_cb; @@ -2040,7 +2047,7 @@ static void exit_count_servers_selected(uv_async_t * handle) static void exit_count_shards(uv_async_t * handle) { siridb_query_t * query = (siridb_query_t *) handle->data; - siridb_t * siridb = ((sirinet_socket_t *) query->client->data)->siridb; + CLIENT_SIRIDB(query->client, siridb) query_count_t * q_count = (query_count_t *) query->data; qp_add_raw(query->packer, (const unsigned char *) "shards", 6); @@ -2111,7 +2118,7 @@ static void exit_count_shards(uv_async_t * handle) static void exit_count_shards_size(uv_async_t * handle) { siridb_query_t * query = (siridb_query_t *) handle->data; - siridb_t * siridb = ((sirinet_socket_t *) query->client->data)->siridb; + CLIENT_SIRIDB(query->client, siridb) query_count_t * q_count = (query_count_t *) query->data; uint64_t duration; size_t i; @@ -2174,7 +2181,7 @@ static void exit_count_shards_size(uv_async_t * handle) static void exit_count_users(uv_async_t * handle) { siridb_query_t * query = (siridb_query_t *) handle->data; - siridb_t * siridb = ((sirinet_socket_t *) query->client->data)->siridb; + CLIENT_SIRIDB(query->client, siridb) llist_node_t * node = siridb->users->first; cexpr_t * where_expr = ((query_count_t *) query->data)->where_expr; cexpr_cb_t cb = (cexpr_cb_t) siridb_user_cexpr_cb; @@ -2199,7 +2206,7 @@ static void exit_count_users(uv_async_t * handle) static void exit_create_group(uv_async_t * handle) { siridb_query_t * query = (siridb_query_t *) handle->data; - siridb_t * siridb = ((sirinet_socket_t *) query->client->data)->siridb; + CLIENT_SIRIDB(query->client, siridb) cleri_node_t * name_nd = query->nodes->node->children->next->node; @@ -2263,7 +2270,7 @@ static void exit_create_group(uv_async_t * handle) static void exit_create_user(uv_async_t * handle) { siridb_query_t * query = (siridb_query_t *) handle->data; - siridb_t * siridb = ((sirinet_socket_t *) query->client->data)->siridb; + CLIENT_SIRIDB(query->client, siridb) siridb_user_t * user = ((query_alter_t *) query->data)->via.user; cleri_node_t * user_node = query->nodes->node->children->next->node; @@ -2331,7 +2338,7 @@ static void exit_create_user(uv_async_t * handle) static void exit_drop_group(uv_async_t * handle) { siridb_query_t * query = (siridb_query_t *) handle->data; - siridb_t * siridb = ((sirinet_socket_t *) query->client->data)->siridb; + CLIENT_SIRIDB(query->client, siridb) MASTER_CHECK_ACCESSIBLE(siridb) @@ -2370,7 +2377,7 @@ static void exit_drop_group(uv_async_t * handle) static void exit_drop_series(uv_async_t * handle) { siridb_query_t * query = (siridb_query_t *) handle->data; - siridb_t * siridb = ((sirinet_socket_t *) query->client->data)->siridb; + CLIENT_SIRIDB(query->client, siridb) query_drop_t * q_drop = (query_drop_t *) query->data; MASTER_CHECK_ACCESSIBLE(siridb) @@ -2481,7 +2488,7 @@ static void exit_drop_series(uv_async_t * handle) static void exit_drop_server(uv_async_t * handle) { siridb_query_t * query = (siridb_query_t *) handle->data; - siridb_t * siridb = ((sirinet_socket_t *) query->client->data)->siridb; + CLIENT_SIRIDB(query->client, siridb) siridb_server_t * server = siridb_server_from_node( siridb, query->nodes->node->children->next->node->children->node, @@ -2550,7 +2557,7 @@ static void exit_drop_server(uv_async_t * handle) static void exit_drop_shards(uv_async_t * handle) { siridb_query_t * query = (siridb_query_t *) handle->data; - siridb_t * siridb = ((sirinet_socket_t *) query->client->data)->siridb; + CLIENT_SIRIDB(query->client, siridb) query_drop_t * q_drop = (query_drop_t *) query->data; MASTER_CHECK_ACCESSIBLE(siridb) @@ -2651,7 +2658,7 @@ static void exit_drop_shards(uv_async_t * handle) static void exit_drop_user(uv_async_t * handle) { siridb_query_t * query = (siridb_query_t *) handle->data; - siridb_t * siridb = ((sirinet_socket_t *) query->client->data)->siridb; + CLIENT_SIRIDB(query->client, siridb) MASTER_CHECK_ACCESSIBLE(siridb) @@ -2662,7 +2669,7 @@ static void exit_drop_user(uv_async_t * handle) strx_extract_string(username, user_node->str, user_node->len); if (siridb_users_drop_user( - ((sirinet_socket_t *) query->client->data)->siridb, + siridb, username, query->err_msg)) { @@ -2692,8 +2699,9 @@ static void exit_drop_user(uv_async_t * handle) static void exit_grant_user(uv_async_t * handle) { siridb_query_t * query = (siridb_query_t *) handle->data; + CLIENT_SIRIDB(query->client, siridb) - if (siridb_users_save(((sirinet_socket_t *) query->client->data)->siridb)) + if (siridb_users_save(siridb)) { sprintf(query->err_msg, "Could not write users to file!"); log_critical(query->err_msg); @@ -2773,7 +2781,7 @@ static void exit_help_xxx(uv_async_t * handle) static void exit_list_groups(uv_async_t * handle) { siridb_query_t * query = (siridb_query_t *) handle->data; - siridb_t * siridb = ((sirinet_socket_t *) query->client->data)->siridb; + CLIENT_SIRIDB(query->client, siridb) query_list_t * q_list = (query_list_t *) query->data; int is_local = (q_list->props == NULL); @@ -2830,7 +2838,7 @@ static void exit_list_groups(uv_async_t * handle) static void exit_list_pools(uv_async_t * handle) { siridb_query_t * query = (siridb_query_t *) handle->data; - siridb_t * siridb = ((sirinet_socket_t *) query->client->data)->siridb; + CLIENT_SIRIDB(query->client, siridb) query_list_t * q_list = (query_list_t *) query->data; siridb_pool_t * pool = siridb->pools->pool + siridb->server->pool; siridb_pool_walker_t wpool = { @@ -2912,7 +2920,7 @@ static void exit_list_series(uv_async_t * handle) { siridb_query_t * query = (siridb_query_t *) handle->data; query_list_t * q_list = (query_list_t *) query->data; - siridb_t * siridb = ((sirinet_socket_t *) query->client->data)->siridb; + CLIENT_SIRIDB(query->client, siridb) if (q_list->props == NULL) { @@ -2965,7 +2973,8 @@ static void exit_list_series(uv_async_t * handle) static void exit_list_servers(uv_async_t * handle) { siridb_query_t * query = (siridb_query_t *) handle->data; - siridb_t * siridb = ((sirinet_socket_t *) query->client->data)->siridb; + CLIENT_SIRIDB(query->client, siridb) + query_list_t * q_list = (query_list_t *) query->data; cexpr_t * where_expr = q_list->where_expr; int is_local = IS_MASTER; @@ -3049,7 +3058,8 @@ static void exit_list_servers(uv_async_t * handle) static void exit_list_shards(uv_async_t * handle) { siridb_query_t * query = (siridb_query_t *) handle->data; - siridb_t * siridb = ((sirinet_socket_t *) query->client->data)->siridb; + CLIENT_SIRIDB(query->client, siridb) + query_list_t * q_list = (query_list_t *) query->data; uint_fast16_t prop; uint64_t duration; @@ -3183,8 +3193,9 @@ static void exit_list_shards(uv_async_t * handle) static void exit_list_users(uv_async_t * handle) { siridb_query_t * query = (siridb_query_t *) handle->data; - llist_node_t * node = - ((sirinet_socket_t *) query->client->data)->siridb->users->first; + CLIENT_SIRIDB(query->client, siridb) + + llist_node_t * node = siridb->users->first; slist_t * props = ((query_list_t *) query->data)->props; cexpr_cb_t cb = (cexpr_cb_t) siridb_user_cexpr_cb; query_list_t * q_list = (query_list_t *) query->data; @@ -3242,8 +3253,9 @@ static void exit_list_users(uv_async_t * handle) static void exit_revoke_user(uv_async_t * handle) { siridb_query_t * query = (siridb_query_t *) handle->data; + CLIENT_SIRIDB(query->client, siridb) - if (siridb_users_save(((sirinet_socket_t *) query->client->data)->siridb)) + if (siridb_users_save(siridb)) { sprintf(query->err_msg, "Could not write users to file!"); log_critical(query->err_msg); @@ -3519,9 +3531,9 @@ static void exit_select_stmt(uv_async_t * handle) static void exit_set_address(uv_async_t * handle) { siridb_query_t * query = (siridb_query_t *) handle->data; - siridb_t * siridb = ((sirinet_socket_t *) query->client->data)->siridb; siridb_server_t * server = ((query_alter_t *) query->data)->via.server; cleri_node_t * node = query->nodes->node->children->next->next->node; + CLIENT_SIRIDB(query->client, siridb) if (siridb->server == server || server->socket != NULL) { @@ -3563,7 +3575,7 @@ static void exit_set_address(uv_async_t * handle) static void exit_set_backup_mode(uv_async_t * handle) { siridb_query_t * query = (siridb_query_t *) handle->data; - siridb_t * siridb = ((sirinet_socket_t *) query->client->data)->siridb; + CLIENT_SIRIDB(query->client, siridb) #if DEBUG assert (query->data != NULL); @@ -3664,7 +3676,7 @@ static void exit_set_backup_mode(uv_async_t * handle) static void exit_set_drop_threshold(uv_async_t * handle) { siridb_query_t * query = (siridb_query_t *) handle->data; - siridb_t * siridb = ((sirinet_socket_t *) query->client->data)->siridb; + CLIENT_SIRIDB(query->client, siridb) MASTER_CHECK_ACCESSIBLE(siridb) @@ -3725,7 +3737,7 @@ static void exit_set_drop_threshold(uv_async_t * handle) static void exit_set_list_limit(uv_async_t * handle) { siridb_query_t * query = (siridb_query_t *) handle->data; - siridb_t * siridb = ((sirinet_socket_t *) query->client->data)->siridb; + CLIENT_SIRIDB(query->client, siridb) MASTER_CHECK_ACCESSIBLE(siridb) MASTER_CHECK_VERSION(siridb, "2.0.17") @@ -3788,8 +3800,8 @@ static void exit_set_list_limit(uv_async_t * handle) static void exit_set_log_level(uv_async_t * handle) { siridb_query_t * query = (siridb_query_t *) handle->data; - siridb_t * siridb = ((sirinet_socket_t *) query->client->data)->siridb; query_alter_t * q_alter = (query_alter_t *) query->data; + CLIENT_SIRIDB(query->client, siridb) #if DEBUG assert (query->data != NULL); @@ -3931,9 +3943,9 @@ static void exit_set_log_level(uv_async_t * handle) static void exit_set_port(uv_async_t * handle) { siridb_query_t * query = (siridb_query_t *) handle->data; - siridb_t * siridb = ((sirinet_socket_t *) query->client->data)->siridb; siridb_server_t * server = ((query_alter_t *) query->data)->via.server; cleri_node_t * node = query->nodes->node->children->next->next->node; + CLIENT_SIRIDB(query->client, siridb) if (siridb->server == server || server->socket != NULL) { @@ -3984,7 +3996,7 @@ static void exit_set_port(uv_async_t * handle) static void exit_set_select_points_limit(uv_async_t * handle) { siridb_query_t * query = (siridb_query_t *) handle->data; - siridb_t * siridb = ((sirinet_socket_t *) query->client->data)->siridb; + CLIENT_SIRIDB(query->client, siridb) MASTER_CHECK_ACCESSIBLE(siridb) MASTER_CHECK_VERSION(siridb, "2.0.17") @@ -4047,8 +4059,8 @@ static void exit_set_select_points_limit(uv_async_t * handle) static void exit_set_timezone(uv_async_t * handle) { siridb_query_t * query = (siridb_query_t *) handle->data; - siridb_t * siridb = ((sirinet_socket_t *) query->client->data)->siridb; cleri_node_t * node = query->nodes->node->children->next->next->node; + CLIENT_SIRIDB(query->client, siridb) MASTER_CHECK_ACCESSIBLE(siridb) @@ -4112,8 +4124,9 @@ static void exit_set_timezone(uv_async_t * handle) static void exit_show_stmt(uv_async_t * handle) { siridb_query_t * query = (siridb_query_t *) handle->data; - - SIRIPARSER_MASTER_CHECK_ACCESS(SIRIDB_ACCESS_SHOW) + CLIENT_SIRIDB(query->client, siridb) + CLIENT_USER(query->client, siridb_user) + SIRIPARSER_MASTER_CHECK_ACCESS(siridb_user, SIRIDB_ACCESS_SHOW) cleri_children_t * children = query->nodes->node->children->next->node->children; @@ -4134,7 +4147,7 @@ static void exit_show_stmt(uv_async_t * handle) qp_add_raw(query->packer, (const unsigned char *) "data", 4); qp_add_type(query->packer, QP_ARRAY_OPEN); - siridb_user_t * user = ((sirinet_socket_t *) query->client->data)->origin; + CLIENT_USER(query->client, user) who_am_i = user->name; if (children->node == NULL) @@ -4148,8 +4161,7 @@ static void exit_show_stmt(uv_async_t * handle) { continue; } - prop_cb(((sirinet_socket_t *) query->client->data)->siridb, - query->packer, 1); + prop_cb(siridb, query->packer, 1); } } else @@ -4163,8 +4175,7 @@ static void exit_show_stmt(uv_async_t * handle) #if DEBUG assert (prop_cb != NULL); /* all props are implemented */ #endif - prop_cb(((sirinet_socket_t *) query->client->data)->siridb, - query->packer, 1); + prop_cb(siridb, query->packer, 1); if (children->next == NULL) { @@ -4184,9 +4195,10 @@ static void exit_show_stmt(uv_async_t * handle) static void exit_timeit_stmt(uv_async_t * handle) { siridb_query_t * query = (siridb_query_t *) handle->data; + CLIENT_SIRIDB(query->client, siridb) + struct timespec end; - char * name = - ((sirinet_socket_t *) query->client->data)->siridb->server->name; + char * name = siridb->server->name; clock_gettime(CLOCK_REALTIME, &end); qp_add_type(query->timeit, QP_MAP2); @@ -4328,8 +4340,8 @@ static void async_count_series_length(uv_async_t * handle) static void async_drop_series(uv_async_t * handle) { siridb_query_t * query = (siridb_query_t *) handle->data; - siridb_t * siridb = ((sirinet_socket_t *) query->client->data)->siridb; query_drop_t * q_drop = (query_drop_t *) query->data; + CLIENT_SIRIDB(query->client, siridb) siridb_series_t * series; uint8_t async_more = 0; @@ -4386,6 +4398,7 @@ static void async_drop_shards(uv_async_t * handle) { siridb_query_t * query = (siridb_query_t *) handle->data; query_drop_t * q_drop = (query_drop_t *) query->data; + CLIENT_SIRIDB(query->client, siridb) if (q_drop->shards_list->len) { @@ -4394,7 +4407,7 @@ static void async_drop_shards(uv_async_t * handle) siridb_shard_drop( shard, - ((sirinet_socket_t *) query->client->data)->siridb); + siridb); siridb_shard_decref(shard); } @@ -4585,7 +4598,7 @@ static void async_no_points_aggregate(uv_async_t * handle) { siridb_query_t * query = (siridb_query_t *) handle->data; query_select_t * q_select = (query_select_t *) query->data; - siridb_t * siridb = ((sirinet_socket_t *) query->client->data)->siridb; + CLIENT_SIRIDB(query->client, siridb) uint8_t async_more = 0; siridb_series_t * series; siridb_points_t * points; @@ -4725,7 +4738,7 @@ static void async_select_aggregate(uv_async_t * handle) { siridb_query_t * query = (siridb_query_t *) handle->data; query_select_t * q_select = (query_select_t *) query->data; - siridb_t * siridb = ((sirinet_socket_t *) query->client->data)->siridb; + CLIENT_SIRIDB(query->client, siridb) uint8_t async_more = 0; siridb_series_t * series; siridb_points_t * points; @@ -5274,7 +5287,7 @@ static void on_groups_response(slist_t * promises, uv_async_t * handle) sirinet_promise_t * promise; qp_unpacker_t unpacker; siridb_query_t * query = (siridb_query_t *) handle->data; - siridb_t * siridb = ((sirinet_socket_t *) query->client->data)->siridb; + CLIENT_SIRIDB(query->client, siridb) siridb_group_t * group; qp_obj_t qp_name; qp_obj_t qp_series; @@ -5418,7 +5431,7 @@ static void on_select_response(slist_t * promises, uv_async_t * handle) sirinet_promise_t * promise; qp_unpacker_t unpacker; siridb_query_t * query = (siridb_query_t *) handle->data; - siridb_t * siridb = ((sirinet_socket_t *) query->client->data)->siridb; + CLIENT_SIRIDB(query->client, siridb) size_t err_count = 0; query_select_t * q_select = (query_select_t *) query->data; qp_obj_t qp_name; @@ -5647,7 +5660,7 @@ static void master_select_work(uv_work_t * work) uv_async_t * handle = (uv_async_t *) work->data; siridb_query_t * query = (siridb_query_t *) handle->data; query_select_t * q_select = (query_select_t *) query->data; - siridb_t * siridb = ((sirinet_socket_t *) query->client->data)->siridb; + CLIENT_SIRIDB(query->client, siridb) siridb->selected_points += q_select->n; int rc = ct_items( q_select->result, @@ -6027,8 +6040,8 @@ static int values_count_groups(siridb_group_t * group, uv_async_t * handle) static void finish_list_groups(uv_async_t * handle) { siridb_query_t * query = (siridb_query_t *) handle->data; - siridb_t * siridb = ((sirinet_socket_t *) query->client->data)->siridb; query_list_t * q_list = (query_list_t *) query->data; + CLIENT_SIRIDB(query->client, siridb) if (q_list->props == NULL) { @@ -6060,8 +6073,8 @@ static void finish_list_groups(uv_async_t * handle) static void finish_count_groups(uv_async_t * handle) { siridb_query_t * query = (siridb_query_t *) handle->data; - siridb_t * siridb = ((sirinet_socket_t *) query->client->data)->siridb; query_count_t * q_count = (query_count_t *) query->data; + CLIENT_SIRIDB(query->client, siridb) /* Note: ct_values(..values_count_groups..) can only result in a positive * value. diff --git a/src/siri/siri.c b/src/siri/siri.c index 932eee26..389a32aa 100644 --- a/src/siri/siri.c +++ b/src/siri/siri.c @@ -40,6 +40,7 @@ #include #include #include +#include #include #include #include @@ -491,8 +492,6 @@ static void SIRI_walk_close_handlers( switch (handle->type) { - case UV_WORK: - break; case UV_SIGNAL: /* this is where we cleanup the signal handlers */ uv_close(handle, NULL); @@ -512,6 +511,20 @@ static void SIRI_walk_close_handlers( } break; + case UV_NAMED_PIPE: + /* This can be a pipe server with data set to NULL or a SiriDB pipe + * which should be destroyed. + */ + if (handle->data == NULL) + { + uv_close(handle, NULL); + } + else + { + sirinet_pipe_decref(handle); + } + break; + case UV_TIMER: /* we do not expect any timer object since they should all be closed * (or at least closing) at this point.